tokio\sync\mpsc/bounded.rs
1use crate::loom::sync::Arc;
2use crate::sync::batch_semaphore::{self as semaphore, TryAcquireError};
3use crate::sync::mpsc::chan;
4use crate::sync::mpsc::error::{SendError, TryRecvError, TrySendError};
5
6cfg_time! {
7 use crate::sync::mpsc::error::SendTimeoutError;
8 use crate::time::Duration;
9}
10
11use std::fmt;
12use std::task::{Context, Poll};
13
14/// Sends values to the associated `Receiver`.
15///
16/// Instances are created by the [`channel`] function.
17///
18/// To convert the `Sender` into a `Sink` or use it in a poll function, you can
19/// use the [`PollSender`] utility.
20///
21/// [`PollSender`]: https://docs.rs/tokio-util/latest/tokio_util/sync/struct.PollSender.html
22pub struct Sender<T> {
23 chan: chan::Tx<T, Semaphore>,
24}
25
26/// A sender that does not prevent the channel from being closed.
27///
28/// If all [`Sender`] instances of a channel were dropped and only `WeakSender`
29/// instances remain, the channel is closed.
30///
31/// In order to send messages, the `WeakSender` needs to be upgraded using
32/// [`WeakSender::upgrade`], which returns `Option<Sender>`. It returns `None`
33/// if all `Sender`s have been dropped, and otherwise it returns a `Sender`.
34///
35/// [`Sender`]: Sender
36/// [`WeakSender::upgrade`]: WeakSender::upgrade
37///
38/// # Examples
39///
40/// ```
41/// use tokio::sync::mpsc::channel;
42///
43/// # #[tokio::main(flavor = "current_thread")]
44/// # async fn main() {
45/// let (tx, _rx) = channel::<i32>(15);
46/// let tx_weak = tx.downgrade();
47///
48/// // Upgrading will succeed because `tx` still exists.
49/// assert!(tx_weak.upgrade().is_some());
50///
51/// // If we drop `tx`, then it will fail.
52/// drop(tx);
53/// assert!(tx_weak.clone().upgrade().is_none());
54/// # }
55/// ```
56pub struct WeakSender<T> {
57 chan: Arc<chan::Chan<T, Semaphore>>,
58}
59
60/// Permits to send one value into the channel.
61///
62/// `Permit` values are returned by [`Sender::reserve()`] and [`Sender::try_reserve()`]
63/// and are used to guarantee channel capacity before generating a message to send.
64///
65/// [`Sender::reserve()`]: Sender::reserve
66/// [`Sender::try_reserve()`]: Sender::try_reserve
67pub struct Permit<'a, T> {
68 chan: &'a chan::Tx<T, Semaphore>,
69}
70
71/// An [`Iterator`] of [`Permit`] that can be used to hold `n` slots in the channel.
72///
73/// `PermitIterator` values are returned by [`Sender::reserve_many()`] and [`Sender::try_reserve_many()`]
74/// and are used to guarantee channel capacity before generating `n` messages to send.
75///
76/// [`Sender::reserve_many()`]: Sender::reserve_many
77/// [`Sender::try_reserve_many()`]: Sender::try_reserve_many
78pub struct PermitIterator<'a, T> {
79 chan: &'a chan::Tx<T, Semaphore>,
80 n: usize,
81}
82
83/// Owned permit to send one value into the channel.
84///
85/// This is identical to the [`Permit`] type, except that it moves the sender
86/// rather than borrowing it.
87///
88/// `OwnedPermit` values are returned by [`Sender::reserve_owned()`] and
89/// [`Sender::try_reserve_owned()`] and are used to guarantee channel capacity
90/// before generating a message to send.
91///
92/// [`Permit`]: Permit
93/// [`Sender::reserve_owned()`]: Sender::reserve_owned
94/// [`Sender::try_reserve_owned()`]: Sender::try_reserve_owned
95pub struct OwnedPermit<T> {
96 chan: Option<chan::Tx<T, Semaphore>>,
97}
98
99/// Receives values from the associated `Sender`.
100///
101/// Instances are created by the [`channel`] function.
102///
103/// This receiver can be turned into a `Stream` using [`ReceiverStream`].
104///
105/// [`ReceiverStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.ReceiverStream.html
106pub struct Receiver<T> {
107 /// The channel receiver.
108 chan: chan::Rx<T, Semaphore>,
109}
110
111/// Creates a bounded mpsc channel for communicating between asynchronous tasks
112/// with backpressure.
113///
114/// The channel will buffer up to the provided number of messages. Once the
115/// buffer is full, attempts to send new messages will wait until a message is
116/// received from the channel. The provided buffer capacity must be at least 1.
117///
118/// All data sent on `Sender` will become available on `Receiver` in the same
119/// order as it was sent.
120///
121/// The `Sender` can be cloned to `send` to the same channel from multiple code
122/// locations. Only one `Receiver` is supported.
123///
124/// If the `Receiver` is disconnected while trying to `send`, the `send` method
125/// will return a `SendError`. Similarly, if `Sender` is disconnected while
126/// trying to `recv`, the `recv` method will return `None`.
127///
128/// # Panics
129///
130/// Panics if the buffer capacity is 0, or too large. Currently the maximum
131/// capacity is [`Semaphore::MAX_PERMITS`].
132///
133/// [`Semaphore::MAX_PERMITS`]: crate::sync::Semaphore::MAX_PERMITS
134///
135/// # Examples
136///
137/// ```rust
138/// use tokio::sync::mpsc;
139///
140/// # #[tokio::main(flavor = "current_thread")]
141/// # async fn main() {
142/// let (tx, mut rx) = mpsc::channel(100);
143///
144/// tokio::spawn(async move {
145/// for i in 0..10 {
146/// if let Err(_) = tx.send(i).await {
147/// println!("receiver dropped");
148/// return;
149/// }
150/// }
151/// });
152///
153/// while let Some(i) = rx.recv().await {
154/// println!("got = {}", i);
155/// }
156/// # }
157/// ```
158#[track_caller]
159pub fn channel<T>(buffer: usize) -> (Sender<T>, Receiver<T>) {
160 assert!(buffer > 0, "mpsc bounded channel requires buffer > 0");
161 let semaphore = Semaphore {
162 semaphore: semaphore::Semaphore::new(buffer),
163 bound: buffer,
164 };
165 let (tx, rx) = chan::channel(semaphore);
166
167 let tx = Sender::new(tx);
168 let rx = Receiver::new(rx);
169
170 (tx, rx)
171}
172
173/// Channel semaphore is a tuple of the semaphore implementation and a `usize`
174/// representing the channel bound.
175#[derive(Debug)]
176pub(crate) struct Semaphore {
177 pub(crate) semaphore: semaphore::Semaphore,
178 pub(crate) bound: usize,
179}
180
181impl<T> Receiver<T> {
182 pub(crate) fn new(chan: chan::Rx<T, Semaphore>) -> Receiver<T> {
183 Receiver { chan }
184 }
185
186 /// Receives the next value for this receiver.
187 ///
188 /// This method returns `None` if the channel has been closed and there are
189 /// no remaining messages in the channel's buffer. This indicates that no
190 /// further values can ever be received from this `Receiver`. The channel is
191 /// closed when all senders have been dropped, or when [`close`] is called.
192 ///
193 /// If there are no messages in the channel's buffer, but the channel has
194 /// not yet been closed, this method will sleep until a message is sent or
195 /// the channel is closed. Note that if [`close`] is called, but there are
196 /// still outstanding [`Permits`] from before it was closed, the channel is
197 /// not considered closed by `recv` until the permits are released.
198 ///
199 /// # Cancel safety
200 ///
201 /// This method is cancel safe. If `recv` is used as the event in a
202 /// [`tokio::select!`](crate::select) statement and some other branch
203 /// completes first, it is guaranteed that no messages were received on this
204 /// channel.
205 ///
206 /// [`close`]: Self::close
207 /// [`Permits`]: struct@crate::sync::mpsc::Permit
208 ///
209 /// # Examples
210 ///
211 /// ```
212 /// use tokio::sync::mpsc;
213 ///
214 /// # #[tokio::main(flavor = "current_thread")]
215 /// # async fn main() {
216 /// let (tx, mut rx) = mpsc::channel(100);
217 ///
218 /// tokio::spawn(async move {
219 /// tx.send("hello").await.unwrap();
220 /// });
221 ///
222 /// assert_eq!(Some("hello"), rx.recv().await);
223 /// assert_eq!(None, rx.recv().await);
224 /// # }
225 /// ```
226 ///
227 /// Values are buffered:
228 ///
229 /// ```
230 /// use tokio::sync::mpsc;
231 ///
232 /// # #[tokio::main(flavor = "current_thread")]
233 /// # async fn main() {
234 /// let (tx, mut rx) = mpsc::channel(100);
235 ///
236 /// tx.send("hello").await.unwrap();
237 /// tx.send("world").await.unwrap();
238 ///
239 /// assert_eq!(Some("hello"), rx.recv().await);
240 /// assert_eq!(Some("world"), rx.recv().await);
241 /// # }
242 /// ```
243 pub async fn recv(&mut self) -> Option<T> {
244 use std::future::poll_fn;
245 poll_fn(|cx| self.chan.recv(cx)).await
246 }
247
248 /// Receives the next values for this receiver and extends `buffer`.
249 ///
250 /// This method extends `buffer` by no more than a fixed number of values
251 /// as specified by `limit`. If `limit` is zero, the function immediately
252 /// returns `0`. The return value is the number of values added to `buffer`.
253 ///
254 /// For `limit > 0`, if there are no messages in the channel's queue, but
255 /// the channel has not yet been closed, this method will sleep until a
256 /// message is sent or the channel is closed. Note that if [`close`] is
257 /// called, but there are still outstanding [`Permits`] from before it was
258 /// closed, the channel is not considered closed by `recv_many` until the
259 /// permits are released.
260 ///
261 /// For non-zero values of `limit`, this method will never return `0` unless
262 /// the channel has been closed and there are no remaining messages in the
263 /// channel's queue. This indicates that no further values can ever be
264 /// received from this `Receiver`. The channel is closed when all senders
265 /// have been dropped, or when [`close`] is called.
266 ///
267 /// The capacity of `buffer` is increased as needed.
268 ///
269 /// # Cancel safety
270 ///
271 /// This method is cancel safe. If `recv_many` is used as the event in a
272 /// [`tokio::select!`](crate::select) statement and some other branch
273 /// completes first, it is guaranteed that no messages were received on this
274 /// channel.
275 ///
276 /// [`close`]: Self::close
277 /// [`Permits`]: struct@crate::sync::mpsc::Permit
278 ///
279 /// # Examples
280 ///
281 /// ```
282 /// use tokio::sync::mpsc;
283 ///
284 /// # #[tokio::main(flavor = "current_thread")]
285 /// # async fn main() {
286 /// let mut buffer: Vec<&str> = Vec::with_capacity(2);
287 /// let limit = 2;
288 /// let (tx, mut rx) = mpsc::channel(100);
289 /// let tx2 = tx.clone();
290 /// tx2.send("first").await.unwrap();
291 /// tx2.send("second").await.unwrap();
292 /// tx2.send("third").await.unwrap();
293 ///
294 /// // Call `recv_many` to receive up to `limit` (2) values.
295 /// assert_eq!(2, rx.recv_many(&mut buffer, limit).await);
296 /// assert_eq!(vec!["first", "second"], buffer);
297 ///
298 /// // If the buffer is full, the next call to `recv_many`
299 /// // reserves additional capacity.
300 /// assert_eq!(1, rx.recv_many(&mut buffer, 1).await);
301 ///
302 /// tokio::spawn(async move {
303 /// tx.send("fourth").await.unwrap();
304 /// });
305 ///
306 /// // 'tx' is dropped, but `recv_many`
307 /// // is guaranteed not to return 0 as the channel
308 /// // is not yet closed.
309 /// assert_eq!(1, rx.recv_many(&mut buffer, 1).await);
310 /// assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
311 ///
312 /// // Once the last sender is dropped, the channel is
313 /// // closed and `recv_many` returns 0, capacity unchanged.
314 /// drop(tx2);
315 /// assert_eq!(0, rx.recv_many(&mut buffer, limit).await);
316 /// assert_eq!(vec!["first", "second", "third", "fourth"], buffer);
317 /// # }
318 /// ```
319 pub async fn recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
320 use std::future::poll_fn;
321 poll_fn(|cx| self.chan.recv_many(cx, buffer, limit)).await
322 }
323
324 /// Tries to receive the next value for this receiver.
325 ///
326 /// This method returns the [`Empty`] error if the channel is currently
327 /// empty, but there are still outstanding [senders] or [permits].
328 ///
329 /// This method returns the [`Disconnected`] error if the channel is
330 /// currently empty, and there are no outstanding [senders] or [permits].
331 ///
332 /// Unlike the [`poll_recv`] method, this method will never return an
333 /// [`Empty`] error spuriously.
334 ///
335 /// [`Empty`]: crate::sync::mpsc::error::TryRecvError::Empty
336 /// [`Disconnected`]: crate::sync::mpsc::error::TryRecvError::Disconnected
337 /// [`poll_recv`]: Self::poll_recv
338 /// [senders]: crate::sync::mpsc::Sender
339 /// [permits]: crate::sync::mpsc::Permit
340 ///
341 /// # Examples
342 ///
343 /// ```
344 /// use tokio::sync::mpsc;
345 /// use tokio::sync::mpsc::error::TryRecvError;
346 ///
347 /// # #[tokio::main(flavor = "current_thread")]
348 /// # async fn main() {
349 /// let (tx, mut rx) = mpsc::channel(100);
350 ///
351 /// tx.send("hello").await.unwrap();
352 ///
353 /// assert_eq!(Ok("hello"), rx.try_recv());
354 /// assert_eq!(Err(TryRecvError::Empty), rx.try_recv());
355 ///
356 /// tx.send("hello").await.unwrap();
357 /// // Drop the last sender, closing the channel.
358 /// drop(tx);
359 ///
360 /// assert_eq!(Ok("hello"), rx.try_recv());
361 /// assert_eq!(Err(TryRecvError::Disconnected), rx.try_recv());
362 /// # }
363 /// ```
364 pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
365 self.chan.try_recv()
366 }
367
368 /// Blocking receive to call outside of asynchronous contexts.
369 ///
370 /// This method returns `None` if the channel has been closed and there are
371 /// no remaining messages in the channel's buffer. This indicates that no
372 /// further values can ever be received from this `Receiver`. The channel is
373 /// closed when all senders have been dropped, or when [`close`] is called.
374 ///
375 /// If there are no messages in the channel's buffer, but the channel has
376 /// not yet been closed, this method will block until a message is sent or
377 /// the channel is closed.
378 ///
379 /// This method is intended for use cases where you are sending from
380 /// asynchronous code to synchronous code, and will work even if the sender
381 /// is not using [`blocking_send`] to send the message.
382 ///
383 /// Note that if [`close`] is called, but there are still outstanding
384 /// [`Permits`] from before it was closed, the channel is not considered
385 /// closed by `blocking_recv` until the permits are released.
386 ///
387 /// [`close`]: Self::close
388 /// [`Permits`]: struct@crate::sync::mpsc::Permit
389 /// [`blocking_send`]: fn@crate::sync::mpsc::Sender::blocking_send
390 ///
391 /// # Panics
392 ///
393 /// This function panics if called within an asynchronous execution
394 /// context.
395 ///
396 /// # Examples
397 ///
398 /// ```
399 /// # #[cfg(not(target_family = "wasm"))]
400 /// # {
401 /// use std::thread;
402 /// use tokio::runtime::Runtime;
403 /// use tokio::sync::mpsc;
404 ///
405 /// fn main() {
406 /// let (tx, mut rx) = mpsc::channel::<u8>(10);
407 ///
408 /// let sync_code = thread::spawn(move || {
409 /// assert_eq!(Some(10), rx.blocking_recv());
410 /// });
411 ///
412 /// Runtime::new()
413 /// .unwrap()
414 /// .block_on(async move {
415 /// let _ = tx.send(10).await;
416 /// });
417 /// sync_code.join().unwrap()
418 /// }
419 /// # }
420 /// ```
421 #[track_caller]
422 #[cfg(feature = "sync")]
423 #[cfg_attr(docsrs, doc(alias = "recv_blocking"))]
424 pub fn blocking_recv(&mut self) -> Option<T> {
425 crate::future::block_on(self.recv())
426 }
427
428 /// Variant of [`Self::recv_many`] for blocking contexts.
429 ///
430 /// The same conditions as in [`Self::blocking_recv`] apply.
431 #[track_caller]
432 #[cfg(feature = "sync")]
433 #[cfg_attr(docsrs, doc(alias = "recv_many_blocking"))]
434 pub fn blocking_recv_many(&mut self, buffer: &mut Vec<T>, limit: usize) -> usize {
435 crate::future::block_on(self.recv_many(buffer, limit))
436 }
437
438 /// Closes the receiving half of a channel without dropping it.
439 ///
440 /// This prevents any further messages from being sent on the channel while
441 /// still enabling the receiver to drain messages that are buffered. Any
442 /// outstanding [`Permit`] values will still be able to send messages.
443 ///
444 /// To guarantee that no messages are dropped, after calling `close()`,
445 /// `recv()` must be called until `None` is returned. If there are
446 /// outstanding [`Permit`] or [`OwnedPermit`] values, the `recv` method will
447 /// not return `None` until those are released.
448 ///
449 /// [`Permit`]: Permit
450 /// [`OwnedPermit`]: OwnedPermit
451 ///
452 /// # Examples
453 ///
454 /// ```
455 /// use tokio::sync::mpsc;
456 ///
457 /// # #[tokio::main(flavor = "current_thread")]
458 /// # async fn main() {
459 /// let (tx, mut rx) = mpsc::channel(20);
460 ///
461 /// tokio::spawn(async move {
462 /// let mut i = 0;
463 /// while let Ok(permit) = tx.reserve().await {
464 /// permit.send(i);
465 /// i += 1;
466 /// }
467 /// });
468 ///
469 /// rx.close();
470 ///
471 /// while let Some(msg) = rx.recv().await {
472 /// println!("got {}", msg);
473 /// }
474 ///
475 /// // Channel closed and no messages are lost.
476 /// # }
477 /// ```
478 pub fn close(&mut self) {
479 self.chan.close();
480 }
481
482 /// Checks if a channel is closed.
483 ///
484 /// This method returns `true` if the channel has been closed. The channel is closed
485 /// when all [`Sender`] have been dropped, or when [`Receiver::close`] is called.
486 ///
487 /// [`Sender`]: crate::sync::mpsc::Sender
488 /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
489 ///
490 /// # Examples
491 /// ```
492 /// use tokio::sync::mpsc;
493 ///
494 /// # #[tokio::main(flavor = "current_thread")]
495 /// # async fn main() {
496 /// let (_tx, mut rx) = mpsc::channel::<()>(10);
497 /// assert!(!rx.is_closed());
498 ///
499 /// rx.close();
500 ///
501 /// assert!(rx.is_closed());
502 /// # }
503 /// ```
504 pub fn is_closed(&self) -> bool {
505 self.chan.is_closed()
506 }
507
508 /// Checks if a channel is empty.
509 ///
510 /// This method returns `true` if the channel has no messages.
511 ///
512 /// # Examples
513 /// ```
514 /// use tokio::sync::mpsc;
515 ///
516 /// # #[tokio::main(flavor = "current_thread")]
517 /// # async fn main() {
518 /// let (tx, rx) = mpsc::channel(10);
519 /// assert!(rx.is_empty());
520 ///
521 /// tx.send(0).await.unwrap();
522 /// assert!(!rx.is_empty());
523 /// # }
524 ///
525 /// ```
526 pub fn is_empty(&self) -> bool {
527 self.chan.is_empty()
528 }
529
530 /// Returns the number of messages in the channel.
531 ///
532 /// # Examples
533 /// ```
534 /// use tokio::sync::mpsc;
535 ///
536 /// # #[tokio::main(flavor = "current_thread")]
537 /// # async fn main() {
538 /// let (tx, rx) = mpsc::channel(10);
539 /// assert_eq!(0, rx.len());
540 ///
541 /// tx.send(0).await.unwrap();
542 /// assert_eq!(1, rx.len());
543 /// # }
544 /// ```
545 pub fn len(&self) -> usize {
546 self.chan.len()
547 }
548
549 /// Returns the current capacity of the channel.
550 ///
551 /// The capacity goes down when the sender sends a value by calling [`Sender::send`] or by reserving
552 /// capacity with [`Sender::reserve`]. The capacity goes up when values are received.
553 /// This is distinct from [`max_capacity`], which always returns buffer capacity initially
554 /// specified when calling [`channel`].
555 ///
556 /// # Examples
557 ///
558 /// ```
559 /// use tokio::sync::mpsc;
560 ///
561 /// # #[tokio::main(flavor = "current_thread")]
562 /// # async fn main() {
563 /// let (tx, mut rx) = mpsc::channel::<()>(5);
564 ///
565 /// assert_eq!(rx.capacity(), 5);
566 ///
567 /// // Making a reservation drops the capacity by one.
568 /// let permit = tx.reserve().await.unwrap();
569 /// assert_eq!(rx.capacity(), 4);
570 /// assert_eq!(rx.len(), 0);
571 ///
572 /// // Sending and receiving a value increases the capacity by one.
573 /// permit.send(());
574 /// assert_eq!(rx.len(), 1);
575 /// rx.recv().await.unwrap();
576 /// assert_eq!(rx.capacity(), 5);
577 ///
578 /// // Directly sending a message drops the capacity by one.
579 /// tx.send(()).await.unwrap();
580 /// assert_eq!(rx.capacity(), 4);
581 /// assert_eq!(rx.len(), 1);
582 ///
583 /// // Receiving the message increases the capacity by one.
584 /// rx.recv().await.unwrap();
585 /// assert_eq!(rx.capacity(), 5);
586 /// assert_eq!(rx.len(), 0);
587 /// # }
588 /// ```
589 /// [`capacity`]: Receiver::capacity
590 /// [`max_capacity`]: Receiver::max_capacity
591 pub fn capacity(&self) -> usize {
592 self.chan.semaphore().semaphore.available_permits()
593 }
594
595 /// Returns the maximum buffer capacity of the channel.
596 ///
597 /// The maximum capacity is the buffer capacity initially specified when calling
598 /// [`channel`]. This is distinct from [`capacity`], which returns the *current*
599 /// available buffer capacity: as messages are sent and received, the value
600 /// returned by [`capacity`] will go up or down, whereas the value
601 /// returned by [`max_capacity`] will remain constant.
602 ///
603 /// # Examples
604 ///
605 /// ```
606 /// use tokio::sync::mpsc;
607 ///
608 /// # #[tokio::main(flavor = "current_thread")]
609 /// # async fn main() {
610 /// let (tx, rx) = mpsc::channel::<()>(5);
611 ///
612 /// // both max capacity and capacity are the same at first
613 /// assert_eq!(rx.max_capacity(), 5);
614 /// assert_eq!(rx.capacity(), 5);
615 ///
616 /// // Making a reservation doesn't change the max capacity.
617 /// let permit = tx.reserve().await.unwrap();
618 /// assert_eq!(rx.max_capacity(), 5);
619 /// // but drops the capacity by one
620 /// assert_eq!(rx.capacity(), 4);
621 /// # }
622 /// ```
623 /// [`capacity`]: Receiver::capacity
624 /// [`max_capacity`]: Receiver::max_capacity
625 pub fn max_capacity(&self) -> usize {
626 self.chan.semaphore().bound
627 }
628
629 /// Polls to receive the next message on this channel.
630 ///
631 /// This method returns:
632 ///
633 /// * `Poll::Pending` if no messages are available but the channel is not
634 /// closed, or if a spurious failure happens.
635 /// * `Poll::Ready(Some(message))` if a message is available.
636 /// * `Poll::Ready(None)` if the channel has been closed and all messages
637 /// sent before it was closed have been received.
638 ///
639 /// When the method returns `Poll::Pending`, the `Waker` in the provided
640 /// `Context` is scheduled to receive a wakeup when a message is sent on any
641 /// receiver, or when the channel is closed. Note that on multiple calls to
642 /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
643 /// passed to the most recent call is scheduled to receive a wakeup.
644 ///
645 /// If this method returns `Poll::Pending` due to a spurious failure, then
646 /// the `Waker` will be notified when the situation causing the spurious
647 /// failure has been resolved. Note that receiving such a wakeup does not
648 /// guarantee that the next call will succeed — it could fail with another
649 /// spurious failure.
650 pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
651 self.chan.recv(cx)
652 }
653
654 /// Polls to receive multiple messages on this channel, extending the provided buffer.
655 ///
656 /// This method returns:
657 /// * `Poll::Pending` if no messages are available but the channel is not closed, or if a
658 /// spurious failure happens.
659 /// * `Poll::Ready(count)` where `count` is the number of messages successfully received and
660 /// stored in `buffer`. This can be less than, or equal to, `limit`.
661 /// * `Poll::Ready(0)` if `limit` is set to zero or when the channel is closed.
662 ///
663 /// When the method returns `Poll::Pending`, the `Waker` in the provided
664 /// `Context` is scheduled to receive a wakeup when a message is sent on any
665 /// receiver, or when the channel is closed. Note that on multiple calls to
666 /// `poll_recv` or `poll_recv_many`, only the `Waker` from the `Context`
667 /// passed to the most recent call is scheduled to receive a wakeup.
668 ///
669 /// Note that this method does not guarantee that exactly `limit` messages
670 /// are received. Rather, if at least one message is available, it returns
671 /// as many messages as it can up to the given limit. This method returns
672 /// zero only if the channel is closed (or if `limit` is zero).
673 ///
674 /// # Examples
675 ///
676 /// ```
677 /// use std::task::{Context, Poll};
678 /// use std::pin::Pin;
679 /// use tokio::sync::mpsc;
680 /// use futures::Future;
681 ///
682 /// struct MyReceiverFuture<'a> {
683 /// receiver: mpsc::Receiver<i32>,
684 /// buffer: &'a mut Vec<i32>,
685 /// limit: usize,
686 /// }
687 ///
688 /// impl<'a> Future for MyReceiverFuture<'a> {
689 /// type Output = usize; // Number of messages received
690 ///
691 /// fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
692 /// let MyReceiverFuture { receiver, buffer, limit } = &mut *self;
693 ///
694 /// // Now `receiver` and `buffer` are mutable references, and `limit` is copied
695 /// match receiver.poll_recv_many(cx, *buffer, *limit) {
696 /// Poll::Pending => Poll::Pending,
697 /// Poll::Ready(count) => Poll::Ready(count),
698 /// }
699 /// }
700 /// }
701 ///
702 /// # #[tokio::main(flavor = "current_thread")]
703 /// # async fn main() {
704 /// let (tx, rx) = mpsc::channel(32);
705 /// let mut buffer = Vec::new();
706 ///
707 /// let my_receiver_future = MyReceiverFuture {
708 /// receiver: rx,
709 /// buffer: &mut buffer,
710 /// limit: 3,
711 /// };
712 ///
713 /// for i in 0..10 {
714 /// tx.send(i).await.unwrap();
715 /// }
716 ///
717 /// let count = my_receiver_future.await;
718 /// assert_eq!(count, 3);
719 /// assert_eq!(buffer, vec![0,1,2])
720 /// # }
721 /// ```
722 pub fn poll_recv_many(
723 &mut self,
724 cx: &mut Context<'_>,
725 buffer: &mut Vec<T>,
726 limit: usize,
727 ) -> Poll<usize> {
728 self.chan.recv_many(cx, buffer, limit)
729 }
730
731 /// Returns the number of [`Sender`] handles.
732 pub fn sender_strong_count(&self) -> usize {
733 self.chan.sender_strong_count()
734 }
735
736 /// Returns the number of [`WeakSender`] handles.
737 pub fn sender_weak_count(&self) -> usize {
738 self.chan.sender_weak_count()
739 }
740}
741
742impl<T> fmt::Debug for Receiver<T> {
743 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
744 fmt.debug_struct("Receiver")
745 .field("chan", &self.chan)
746 .finish()
747 }
748}
749
750impl<T> Unpin for Receiver<T> {}
751
752impl<T> Sender<T> {
753 pub(crate) fn new(chan: chan::Tx<T, Semaphore>) -> Sender<T> {
754 Sender { chan }
755 }
756
757 /// Sends a value, waiting until there is capacity.
758 ///
759 /// A successful send occurs when it is determined that the other end of the
760 /// channel has not hung up already. An unsuccessful send would be one where
761 /// the corresponding receiver has already been closed. Note that a return
762 /// value of `Err` means that the data will never be received, but a return
763 /// value of `Ok` does not mean that the data will be received. It is
764 /// possible for the corresponding receiver to hang up immediately after
765 /// this function returns `Ok`.
766 ///
767 /// # Errors
768 ///
769 /// If the receive half of the channel is closed, either due to [`close`]
770 /// being called or the [`Receiver`] handle dropping, the function returns
771 /// an error. The error includes the value passed to `send`.
772 ///
773 /// [`close`]: Receiver::close
774 /// [`Receiver`]: Receiver
775 ///
776 /// # Cancel safety
777 ///
778 /// If `send` is used as the event in a [`tokio::select!`](crate::select)
779 /// statement and some other branch completes first, then it is guaranteed
780 /// that the message was not sent. **However, in that case, the message
781 /// is dropped and will be lost.**
782 ///
783 /// To avoid losing messages, use [`reserve`](Self::reserve) to reserve
784 /// capacity, then use the returned [`Permit`] to send the message.
785 ///
786 /// This channel uses a queue to ensure that calls to `send` and `reserve`
787 /// complete in the order they were requested. Cancelling a call to
788 /// `send` makes you lose your place in the queue.
789 ///
790 /// # Examples
791 ///
792 /// In the following example, each call to `send` will block until the
793 /// previously sent value was received.
794 ///
795 /// ```rust
796 /// use tokio::sync::mpsc;
797 ///
798 /// # #[tokio::main(flavor = "current_thread")]
799 /// # async fn main() {
800 /// let (tx, mut rx) = mpsc::channel(1);
801 ///
802 /// tokio::spawn(async move {
803 /// for i in 0..10 {
804 /// if let Err(_) = tx.send(i).await {
805 /// println!("receiver dropped");
806 /// return;
807 /// }
808 /// }
809 /// });
810 ///
811 /// while let Some(i) = rx.recv().await {
812 /// println!("got = {}", i);
813 /// }
814 /// # }
815 /// ```
816 pub async fn send(&self, value: T) -> Result<(), SendError<T>> {
817 match self.reserve().await {
818 Ok(permit) => {
819 permit.send(value);
820 Ok(())
821 }
822 Err(_) => Err(SendError(value)),
823 }
824 }
825
826 /// Completes when the receiver has dropped.
827 ///
828 /// This allows the producers to get notified when interest in the produced
829 /// values is canceled and immediately stop doing work.
830 ///
831 /// # Cancel safety
832 ///
833 /// This method is cancel safe. Once the channel is closed, it stays closed
834 /// forever and all future calls to `closed` will return immediately.
835 ///
836 /// # Examples
837 ///
838 /// ```
839 /// use tokio::sync::mpsc;
840 ///
841 /// # #[tokio::main(flavor = "current_thread")]
842 /// # async fn main() {
843 /// let (tx1, rx) = mpsc::channel::<()>(1);
844 /// let tx2 = tx1.clone();
845 /// let tx3 = tx1.clone();
846 /// let tx4 = tx1.clone();
847 /// let tx5 = tx1.clone();
848 /// tokio::spawn(async move {
849 /// drop(rx);
850 /// });
851 ///
852 /// futures::join!(
853 /// tx1.closed(),
854 /// tx2.closed(),
855 /// tx3.closed(),
856 /// tx4.closed(),
857 /// tx5.closed()
858 /// );
859 /// println!("Receiver dropped");
860 /// # }
861 /// ```
862 pub async fn closed(&self) {
863 self.chan.closed().await;
864 }
865
866 /// Attempts to immediately send a message on this `Sender`
867 ///
868 /// This method differs from [`send`] by returning immediately if the channel's
869 /// buffer is full or no receiver is waiting to acquire some data. Compared
870 /// with [`send`], this function has two failure cases instead of one (one for
871 /// disconnection, one for a full buffer).
872 ///
873 /// # Errors
874 ///
875 /// If the channel capacity has been reached, i.e., the channel has `n`
876 /// buffered values where `n` is the argument passed to [`channel`], then an
877 /// error is returned.
878 ///
879 /// If the receive half of the channel is closed, either due to [`close`]
880 /// being called or the [`Receiver`] handle dropping, the function returns
881 /// an error. The error includes the value passed to `send`.
882 ///
883 /// [`send`]: Sender::send
884 /// [`channel`]: channel
885 /// [`close`]: Receiver::close
886 ///
887 /// # Examples
888 ///
889 /// ```
890 /// use tokio::sync::mpsc;
891 ///
892 /// # #[tokio::main(flavor = "current_thread")]
893 /// # async fn main() {
894 /// // Create a channel with buffer size 1
895 /// let (tx1, mut rx) = mpsc::channel(1);
896 /// let tx2 = tx1.clone();
897 ///
898 /// tokio::spawn(async move {
899 /// tx1.send(1).await.unwrap();
900 /// tx1.send(2).await.unwrap();
901 /// // task waits until the receiver receives a value.
902 /// });
903 ///
904 /// tokio::spawn(async move {
905 /// // This will return an error and send
906 /// // no message if the buffer is full
907 /// let _ = tx2.try_send(3);
908 /// });
909 ///
910 /// let mut msg;
911 /// msg = rx.recv().await.unwrap();
912 /// println!("message {} received", msg);
913 ///
914 /// msg = rx.recv().await.unwrap();
915 /// println!("message {} received", msg);
916 ///
917 /// // Third message may have never been sent
918 /// match rx.recv().await {
919 /// Some(msg) => println!("message {} received", msg),
920 /// None => println!("the third message was never sent"),
921 /// }
922 /// # }
923 /// ```
924 pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
925 match self.chan.semaphore().semaphore.try_acquire(1) {
926 Ok(()) => {}
927 Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(message)),
928 Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(message)),
929 }
930
931 // Send the message
932 self.chan.send(message);
933 Ok(())
934 }
935
936 /// Sends a value, waiting until there is capacity, but only for a limited time.
937 ///
938 /// Shares the same success and error conditions as [`send`], adding one more
939 /// condition for an unsuccessful send, which is when the provided timeout has
940 /// elapsed, and there is no capacity available.
941 ///
942 /// [`send`]: Sender::send
943 ///
944 /// # Errors
945 ///
946 /// If the receive half of the channel is closed, either due to [`close`]
947 /// being called or the [`Receiver`] having been dropped,
948 /// the function returns an error. The error includes the value passed to `send`.
949 ///
950 /// [`close`]: Receiver::close
951 /// [`Receiver`]: Receiver
952 ///
953 /// # Panics
954 ///
955 /// This function panics if it is called outside the context of a Tokio
956 /// runtime [with time enabled](crate::runtime::Builder::enable_time).
957 ///
958 /// # Examples
959 ///
960 /// In the following example, each call to `send_timeout` will block until the
961 /// previously sent value was received, unless the timeout has elapsed.
962 ///
963 /// ```rust
964 /// use tokio::sync::mpsc;
965 /// use tokio::time::{sleep, Duration};
966 ///
967 /// # #[tokio::main(flavor = "current_thread")]
968 /// # async fn main() {
969 /// let (tx, mut rx) = mpsc::channel(1);
970 ///
971 /// tokio::spawn(async move {
972 /// for i in 0..10 {
973 /// if let Err(e) = tx.send_timeout(i, Duration::from_millis(100)).await {
974 /// println!("send error: #{:?}", e);
975 /// return;
976 /// }
977 /// }
978 /// });
979 ///
980 /// while let Some(i) = rx.recv().await {
981 /// println!("got = {}", i);
982 /// sleep(Duration::from_millis(200)).await;
983 /// }
984 /// # }
985 /// ```
986 #[cfg(feature = "time")]
987 #[cfg_attr(docsrs, doc(cfg(feature = "time")))]
988 pub async fn send_timeout(
989 &self,
990 value: T,
991 timeout: Duration,
992 ) -> Result<(), SendTimeoutError<T>> {
993 let permit = match crate::time::timeout(timeout, self.reserve()).await {
994 Err(_) => {
995 return Err(SendTimeoutError::Timeout(value));
996 }
997 Ok(Err(_)) => {
998 return Err(SendTimeoutError::Closed(value));
999 }
1000 Ok(Ok(permit)) => permit,
1001 };
1002
1003 permit.send(value);
1004 Ok(())
1005 }
1006
1007 /// Blocking send to call outside of asynchronous contexts.
1008 ///
1009 /// This method is intended for use cases where you are sending from
1010 /// synchronous code to asynchronous code, and will work even if the
1011 /// receiver is not using [`blocking_recv`] to receive the message.
1012 ///
1013 /// [`blocking_recv`]: fn@crate::sync::mpsc::Receiver::blocking_recv
1014 ///
1015 /// # Panics
1016 ///
1017 /// This function panics if called within an asynchronous execution
1018 /// context.
1019 ///
1020 /// # Examples
1021 ///
1022 /// ```
1023 /// # #[cfg(not(target_family = "wasm"))]
1024 /// # {
1025 /// use std::thread;
1026 /// use tokio::runtime::Runtime;
1027 /// use tokio::sync::mpsc;
1028 ///
1029 /// fn main() {
1030 /// let (tx, mut rx) = mpsc::channel::<u8>(1);
1031 ///
1032 /// let sync_code = thread::spawn(move || {
1033 /// tx.blocking_send(10).unwrap();
1034 /// });
1035 ///
1036 /// Runtime::new().unwrap().block_on(async move {
1037 /// assert_eq!(Some(10), rx.recv().await);
1038 /// });
1039 /// sync_code.join().unwrap()
1040 /// }
1041 /// # }
1042 /// ```
1043 #[track_caller]
1044 #[cfg(feature = "sync")]
1045 #[cfg_attr(docsrs, doc(alias = "send_blocking"))]
1046 pub fn blocking_send(&self, value: T) -> Result<(), SendError<T>> {
1047 crate::future::block_on(self.send(value))
1048 }
1049
1050 /// Checks if the channel has been closed. This happens when the
1051 /// [`Receiver`] is dropped, or when the [`Receiver::close`] method is
1052 /// called.
1053 ///
1054 /// [`Receiver`]: crate::sync::mpsc::Receiver
1055 /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close
1056 ///
1057 /// ```
1058 /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(42);
1059 /// assert!(!tx.is_closed());
1060 ///
1061 /// let tx2 = tx.clone();
1062 /// assert!(!tx2.is_closed());
1063 ///
1064 /// drop(rx);
1065 /// assert!(tx.is_closed());
1066 /// assert!(tx2.is_closed());
1067 /// ```
1068 pub fn is_closed(&self) -> bool {
1069 self.chan.is_closed()
1070 }
1071
1072 /// Waits for channel capacity. Once capacity to send one message is
1073 /// available, it is reserved for the caller.
1074 ///
1075 /// If the channel is full, the function waits for the number of unreceived
1076 /// messages to become less than the channel capacity. Capacity to send one
1077 /// message is reserved for the caller. A [`Permit`] is returned to track
1078 /// the reserved capacity. The [`send`] function on [`Permit`] consumes the
1079 /// reserved capacity.
1080 ///
1081 /// Dropping [`Permit`] without sending a message releases the capacity back
1082 /// to the channel.
1083 ///
1084 /// [`Permit`]: Permit
1085 /// [`send`]: Permit::send
1086 ///
1087 /// # Cancel safety
1088 ///
1089 /// This channel uses a queue to ensure that calls to `send` and `reserve`
1090 /// complete in the order they were requested. Cancelling a call to
1091 /// `reserve` makes you lose your place in the queue.
1092 ///
1093 /// # Examples
1094 ///
1095 /// ```
1096 /// use tokio::sync::mpsc;
1097 ///
1098 /// # #[tokio::main(flavor = "current_thread")]
1099 /// # async fn main() {
1100 /// let (tx, mut rx) = mpsc::channel(1);
1101 ///
1102 /// // Reserve capacity
1103 /// let permit = tx.reserve().await.unwrap();
1104 ///
1105 /// // Trying to send directly on the `tx` will fail due to no
1106 /// // available capacity.
1107 /// assert!(tx.try_send(123).is_err());
1108 ///
1109 /// // Sending on the permit succeeds
1110 /// permit.send(456);
1111 ///
1112 /// // The value sent on the permit is received
1113 /// assert_eq!(rx.recv().await.unwrap(), 456);
1114 /// # }
1115 /// ```
1116 pub async fn reserve(&self) -> Result<Permit<'_, T>, SendError<()>> {
1117 self.reserve_inner(1).await?;
1118 Ok(Permit { chan: &self.chan })
1119 }
1120
1121 /// Waits for channel capacity. Once capacity to send `n` messages is
1122 /// available, it is reserved for the caller.
1123 ///
1124 /// If the channel is full or if there are fewer than `n` permits available, the function waits
1125 /// for the number of unreceived messages to become `n` less than the channel capacity.
1126 /// Capacity to send `n` message is then reserved for the caller.
1127 ///
1128 /// A [`PermitIterator`] is returned to track the reserved capacity.
1129 /// You can call this [`Iterator`] until it is exhausted to
1130 /// get a [`Permit`] and then call [`Permit::send`]. This function is similar to
1131 /// [`try_reserve_many`] except it awaits for the slots to become available.
1132 ///
1133 /// If the channel is closed, the function returns a [`SendError`].
1134 ///
1135 /// Dropping [`PermitIterator`] without consuming it entirely releases the remaining
1136 /// permits back to the channel.
1137 ///
1138 /// [`PermitIterator`]: PermitIterator
1139 /// [`Permit`]: Permit
1140 /// [`send`]: Permit::send
1141 /// [`try_reserve_many`]: Sender::try_reserve_many
1142 ///
1143 /// # Cancel safety
1144 ///
1145 /// This channel uses a queue to ensure that calls to `send` and `reserve_many`
1146 /// complete in the order they were requested. Cancelling a call to
1147 /// `reserve_many` makes you lose your place in the queue.
1148 ///
1149 /// # Examples
1150 ///
1151 /// ```
1152 /// use tokio::sync::mpsc;
1153 ///
1154 /// # #[tokio::main(flavor = "current_thread")]
1155 /// # async fn main() {
1156 /// let (tx, mut rx) = mpsc::channel(2);
1157 ///
1158 /// // Reserve capacity
1159 /// let mut permit = tx.reserve_many(2).await.unwrap();
1160 ///
1161 /// // Trying to send directly on the `tx` will fail due to no
1162 /// // available capacity.
1163 /// assert!(tx.try_send(123).is_err());
1164 ///
1165 /// // Sending with the permit iterator succeeds
1166 /// permit.next().unwrap().send(456);
1167 /// permit.next().unwrap().send(457);
1168 ///
1169 /// // The iterator should now be exhausted
1170 /// assert!(permit.next().is_none());
1171 ///
1172 /// // The value sent on the permit is received
1173 /// assert_eq!(rx.recv().await.unwrap(), 456);
1174 /// assert_eq!(rx.recv().await.unwrap(), 457);
1175 /// # }
1176 /// ```
1177 pub async fn reserve_many(&self, n: usize) -> Result<PermitIterator<'_, T>, SendError<()>> {
1178 self.reserve_inner(n).await?;
1179 Ok(PermitIterator {
1180 chan: &self.chan,
1181 n,
1182 })
1183 }
1184
1185 /// Waits for channel capacity, moving the `Sender` and returning an owned
1186 /// permit. Once capacity to send one message is available, it is reserved
1187 /// for the caller.
1188 ///
1189 /// This moves the sender _by value_, and returns an owned permit that can
1190 /// be used to send a message into the channel. Unlike [`Sender::reserve`],
1191 /// this method may be used in cases where the permit must be valid for the
1192 /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is
1193 /// essentially a reference count increment, comparable to [`Arc::clone`]),
1194 /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
1195 /// moved, it can be cloned prior to calling `reserve_owned`.
1196 ///
1197 /// If the channel is full, the function waits for the number of unreceived
1198 /// messages to become less than the channel capacity. Capacity to send one
1199 /// message is reserved for the caller. An [`OwnedPermit`] is returned to
1200 /// track the reserved capacity. The [`send`] function on [`OwnedPermit`]
1201 /// consumes the reserved capacity.
1202 ///
1203 /// Dropping the [`OwnedPermit`] without sending a message releases the
1204 /// capacity back to the channel.
1205 ///
1206 /// # Cancel safety
1207 ///
1208 /// This channel uses a queue to ensure that calls to `send` and `reserve`
1209 /// complete in the order they were requested. Cancelling a call to
1210 /// `reserve_owned` makes you lose your place in the queue.
1211 ///
1212 /// # Examples
1213 /// Sending a message using an [`OwnedPermit`]:
1214 /// ```
1215 /// use tokio::sync::mpsc;
1216 ///
1217 /// # #[tokio::main(flavor = "current_thread")]
1218 /// # async fn main() {
1219 /// let (tx, mut rx) = mpsc::channel(1);
1220 ///
1221 /// // Reserve capacity, moving the sender.
1222 /// let permit = tx.reserve_owned().await.unwrap();
1223 ///
1224 /// // Send a message, consuming the permit and returning
1225 /// // the moved sender.
1226 /// let tx = permit.send(123);
1227 ///
1228 /// // The value sent on the permit is received.
1229 /// assert_eq!(rx.recv().await.unwrap(), 123);
1230 ///
1231 /// // The sender can now be used again.
1232 /// tx.send(456).await.unwrap();
1233 /// # }
1234 /// ```
1235 ///
1236 /// When multiple [`OwnedPermit`]s are needed, or the sender cannot be moved
1237 /// by value, it can be inexpensively cloned before calling `reserve_owned`:
1238 ///
1239 /// ```
1240 /// use tokio::sync::mpsc;
1241 ///
1242 /// # #[tokio::main(flavor = "current_thread")]
1243 /// # async fn main() {
1244 /// let (tx, mut rx) = mpsc::channel(1);
1245 ///
1246 /// // Clone the sender and reserve capacity.
1247 /// let permit = tx.clone().reserve_owned().await.unwrap();
1248 ///
1249 /// // Trying to send directly on the `tx` will fail due to no
1250 /// // available capacity.
1251 /// assert!(tx.try_send(123).is_err());
1252 ///
1253 /// // Sending on the permit succeeds.
1254 /// permit.send(456);
1255 ///
1256 /// // The value sent on the permit is received
1257 /// assert_eq!(rx.recv().await.unwrap(), 456);
1258 /// # }
1259 /// ```
1260 ///
1261 /// [`Sender::reserve`]: Sender::reserve
1262 /// [`OwnedPermit`]: OwnedPermit
1263 /// [`send`]: OwnedPermit::send
1264 /// [`Arc::clone`]: std::sync::Arc::clone
1265 pub async fn reserve_owned(self) -> Result<OwnedPermit<T>, SendError<()>> {
1266 self.reserve_inner(1).await?;
1267 Ok(OwnedPermit {
1268 chan: Some(self.chan),
1269 })
1270 }
1271
1272 async fn reserve_inner(&self, n: usize) -> Result<(), SendError<()>> {
1273 crate::trace::async_trace_leaf().await;
1274
1275 if n > self.max_capacity() {
1276 return Err(SendError(()));
1277 }
1278 match self.chan.semaphore().semaphore.acquire(n).await {
1279 Ok(()) => Ok(()),
1280 Err(_) => Err(SendError(())),
1281 }
1282 }
1283
1284 /// Tries to acquire a slot in the channel without waiting for the slot to become
1285 /// available.
1286 ///
1287 /// If the channel is full this function will return [`TrySendError`], otherwise
1288 /// if there is a slot available it will return a [`Permit`] that will then allow you
1289 /// to [`send`] on the channel with a guaranteed slot. This function is similar to
1290 /// [`reserve`] except it does not await for the slot to become available.
1291 ///
1292 /// Dropping [`Permit`] without sending a message releases the capacity back
1293 /// to the channel.
1294 ///
1295 /// [`Permit`]: Permit
1296 /// [`send`]: Permit::send
1297 /// [`reserve`]: Sender::reserve
1298 ///
1299 /// # Examples
1300 ///
1301 /// ```
1302 /// use tokio::sync::mpsc;
1303 ///
1304 /// # #[tokio::main(flavor = "current_thread")]
1305 /// # async fn main() {
1306 /// let (tx, mut rx) = mpsc::channel(1);
1307 ///
1308 /// // Reserve capacity
1309 /// let permit = tx.try_reserve().unwrap();
1310 ///
1311 /// // Trying to send directly on the `tx` will fail due to no
1312 /// // available capacity.
1313 /// assert!(tx.try_send(123).is_err());
1314 ///
1315 /// // Trying to reserve an additional slot on the `tx` will
1316 /// // fail because there is no capacity.
1317 /// assert!(tx.try_reserve().is_err());
1318 ///
1319 /// // Sending on the permit succeeds
1320 /// permit.send(456);
1321 ///
1322 /// // The value sent on the permit is received
1323 /// assert_eq!(rx.recv().await.unwrap(), 456);
1324 ///
1325 /// # }
1326 /// ```
1327 pub fn try_reserve(&self) -> Result<Permit<'_, T>, TrySendError<()>> {
1328 match self.chan.semaphore().semaphore.try_acquire(1) {
1329 Ok(()) => {}
1330 Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
1331 Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
1332 }
1333
1334 Ok(Permit { chan: &self.chan })
1335 }
1336
1337 /// Tries to acquire `n` slots in the channel without waiting for the slot to become
1338 /// available.
1339 ///
1340 /// A [`PermitIterator`] is returned to track the reserved capacity.
1341 /// You can call this [`Iterator`] until it is exhausted to
1342 /// get a [`Permit`] and then call [`Permit::send`]. This function is similar to
1343 /// [`reserve_many`] except it does not await for the slots to become available.
1344 ///
1345 /// If there are fewer than `n` permits available on the channel, then
1346 /// this function will return a [`TrySendError::Full`]. If the channel is closed
1347 /// this function will return a [`TrySendError::Closed`].
1348 ///
1349 /// Dropping [`PermitIterator`] without consuming it entirely releases the remaining
1350 /// permits back to the channel.
1351 ///
1352 /// [`PermitIterator`]: PermitIterator
1353 /// [`send`]: Permit::send
1354 /// [`reserve_many`]: Sender::reserve_many
1355 ///
1356 /// # Examples
1357 ///
1358 /// ```
1359 /// use tokio::sync::mpsc;
1360 ///
1361 /// # #[tokio::main(flavor = "current_thread")]
1362 /// # async fn main() {
1363 /// let (tx, mut rx) = mpsc::channel(2);
1364 ///
1365 /// // Reserve capacity
1366 /// let mut permit = tx.try_reserve_many(2).unwrap();
1367 ///
1368 /// // Trying to send directly on the `tx` will fail due to no
1369 /// // available capacity.
1370 /// assert!(tx.try_send(123).is_err());
1371 ///
1372 /// // Trying to reserve an additional slot on the `tx` will
1373 /// // fail because there is no capacity.
1374 /// assert!(tx.try_reserve().is_err());
1375 ///
1376 /// // Sending with the permit iterator succeeds
1377 /// permit.next().unwrap().send(456);
1378 /// permit.next().unwrap().send(457);
1379 ///
1380 /// // The iterator should now be exhausted
1381 /// assert!(permit.next().is_none());
1382 ///
1383 /// // The value sent on the permit is received
1384 /// assert_eq!(rx.recv().await.unwrap(), 456);
1385 /// assert_eq!(rx.recv().await.unwrap(), 457);
1386 ///
1387 /// // Trying to call try_reserve_many with 0 will return an empty iterator
1388 /// let mut permit = tx.try_reserve_many(0).unwrap();
1389 /// assert!(permit.next().is_none());
1390 ///
1391 /// // Trying to call try_reserve_many with a number greater than the channel
1392 /// // capacity will return an error
1393 /// let permit = tx.try_reserve_many(3);
1394 /// assert!(permit.is_err());
1395 ///
1396 /// // Trying to call try_reserve_many on a closed channel will return an error
1397 /// drop(rx);
1398 /// let permit = tx.try_reserve_many(1);
1399 /// assert!(permit.is_err());
1400 ///
1401 /// let permit = tx.try_reserve_many(0);
1402 /// assert!(permit.is_err());
1403 /// # }
1404 /// ```
1405 pub fn try_reserve_many(&self, n: usize) -> Result<PermitIterator<'_, T>, TrySendError<()>> {
1406 if n > self.max_capacity() {
1407 return Err(TrySendError::Full(()));
1408 }
1409
1410 match self.chan.semaphore().semaphore.try_acquire(n) {
1411 Ok(()) => {}
1412 Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(())),
1413 Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(())),
1414 }
1415
1416 Ok(PermitIterator {
1417 chan: &self.chan,
1418 n,
1419 })
1420 }
1421
1422 /// Tries to acquire a slot in the channel without waiting for the slot to become
1423 /// available, returning an owned permit.
1424 ///
1425 /// This moves the sender _by value_, and returns an owned permit that can
1426 /// be used to send a message into the channel. Unlike [`Sender::try_reserve`],
1427 /// this method may be used in cases where the permit must be valid for the
1428 /// `'static` lifetime. `Sender`s may be cloned cheaply (`Sender::clone` is
1429 /// essentially a reference count increment, comparable to [`Arc::clone`]),
1430 /// so when multiple [`OwnedPermit`]s are needed or the `Sender` cannot be
1431 /// moved, it can be cloned prior to calling `try_reserve_owned`.
1432 ///
1433 /// If the channel is full this function will return a [`TrySendError`].
1434 /// Since the sender is taken by value, the `TrySendError` returned in this
1435 /// case contains the sender, so that it may be used again. Otherwise, if
1436 /// there is a slot available, this method will return an [`OwnedPermit`]
1437 /// that can then be used to [`send`] on the channel with a guaranteed slot.
1438 /// This function is similar to [`reserve_owned`] except it does not await
1439 /// for the slot to become available.
1440 ///
1441 /// Dropping the [`OwnedPermit`] without sending a message releases the capacity back
1442 /// to the channel.
1443 ///
1444 /// [`OwnedPermit`]: OwnedPermit
1445 /// [`send`]: OwnedPermit::send
1446 /// [`reserve_owned`]: Sender::reserve_owned
1447 /// [`Arc::clone`]: std::sync::Arc::clone
1448 ///
1449 /// # Examples
1450 ///
1451 /// ```
1452 /// use tokio::sync::mpsc;
1453 ///
1454 /// # #[tokio::main(flavor = "current_thread")]
1455 /// # async fn main() {
1456 /// let (tx, mut rx) = mpsc::channel(1);
1457 ///
1458 /// // Reserve capacity
1459 /// let permit = tx.clone().try_reserve_owned().unwrap();
1460 ///
1461 /// // Trying to send directly on the `tx` will fail due to no
1462 /// // available capacity.
1463 /// assert!(tx.try_send(123).is_err());
1464 ///
1465 /// // Trying to reserve an additional slot on the `tx` will
1466 /// // fail because there is no capacity.
1467 /// assert!(tx.try_reserve().is_err());
1468 ///
1469 /// // Sending on the permit succeeds
1470 /// permit.send(456);
1471 ///
1472 /// // The value sent on the permit is received
1473 /// assert_eq!(rx.recv().await.unwrap(), 456);
1474 ///
1475 /// # }
1476 /// ```
1477 pub fn try_reserve_owned(self) -> Result<OwnedPermit<T>, TrySendError<Self>> {
1478 match self.chan.semaphore().semaphore.try_acquire(1) {
1479 Ok(()) => {}
1480 Err(TryAcquireError::Closed) => return Err(TrySendError::Closed(self)),
1481 Err(TryAcquireError::NoPermits) => return Err(TrySendError::Full(self)),
1482 }
1483
1484 Ok(OwnedPermit {
1485 chan: Some(self.chan),
1486 })
1487 }
1488
1489 /// Returns `true` if senders belong to the same channel.
1490 ///
1491 /// # Examples
1492 ///
1493 /// ```
1494 /// let (tx, rx) = tokio::sync::mpsc::channel::<()>(1);
1495 /// let tx2 = tx.clone();
1496 /// assert!(tx.same_channel(&tx2));
1497 ///
1498 /// let (tx3, rx3) = tokio::sync::mpsc::channel::<()>(1);
1499 /// assert!(!tx3.same_channel(&tx2));
1500 /// ```
1501 pub fn same_channel(&self, other: &Self) -> bool {
1502 self.chan.same_channel(&other.chan)
1503 }
1504
1505 /// Returns the current capacity of the channel.
1506 ///
1507 /// The capacity goes down when sending a value by calling [`send`] or by reserving capacity
1508 /// with [`reserve`]. The capacity goes up when values are received by the [`Receiver`].
1509 /// This is distinct from [`max_capacity`], which always returns buffer capacity initially
1510 /// specified when calling [`channel`]
1511 ///
1512 /// # Examples
1513 ///
1514 /// ```
1515 /// use tokio::sync::mpsc;
1516 ///
1517 /// # #[tokio::main(flavor = "current_thread")]
1518 /// # async fn main() {
1519 /// let (tx, mut rx) = mpsc::channel::<()>(5);
1520 ///
1521 /// assert_eq!(tx.capacity(), 5);
1522 ///
1523 /// // Making a reservation drops the capacity by one.
1524 /// let permit = tx.reserve().await.unwrap();
1525 /// assert_eq!(tx.capacity(), 4);
1526 ///
1527 /// // Sending and receiving a value increases the capacity by one.
1528 /// permit.send(());
1529 /// rx.recv().await.unwrap();
1530 /// assert_eq!(tx.capacity(), 5);
1531 /// # }
1532 /// ```
1533 ///
1534 /// [`send`]: Sender::send
1535 /// [`reserve`]: Sender::reserve
1536 /// [`channel`]: channel
1537 /// [`max_capacity`]: Sender::max_capacity
1538 pub fn capacity(&self) -> usize {
1539 self.chan.semaphore().semaphore.available_permits()
1540 }
1541
1542 /// Converts the `Sender` to a [`WeakSender`] that does not count
1543 /// towards RAII semantics, i.e. if all `Sender` instances of the
1544 /// channel were dropped and only `WeakSender` instances remain,
1545 /// the channel is closed.
1546 #[must_use = "Downgrade creates a WeakSender without destroying the original non-weak sender."]
1547 pub fn downgrade(&self) -> WeakSender<T> {
1548 WeakSender {
1549 chan: self.chan.downgrade(),
1550 }
1551 }
1552
1553 /// Returns the maximum buffer capacity of the channel.
1554 ///
1555 /// The maximum capacity is the buffer capacity initially specified when calling
1556 /// [`channel`]. This is distinct from [`capacity`], which returns the *current*
1557 /// available buffer capacity: as messages are sent and received, the
1558 /// value returned by [`capacity`] will go up or down, whereas the value
1559 /// returned by [`max_capacity`] will remain constant.
1560 ///
1561 /// # Examples
1562 ///
1563 /// ```
1564 /// use tokio::sync::mpsc;
1565 ///
1566 /// # #[tokio::main(flavor = "current_thread")]
1567 /// # async fn main() {
1568 /// let (tx, _rx) = mpsc::channel::<()>(5);
1569 ///
1570 /// // both max capacity and capacity are the same at first
1571 /// assert_eq!(tx.max_capacity(), 5);
1572 /// assert_eq!(tx.capacity(), 5);
1573 ///
1574 /// // Making a reservation doesn't change the max capacity.
1575 /// let permit = tx.reserve().await.unwrap();
1576 /// assert_eq!(tx.max_capacity(), 5);
1577 /// // but drops the capacity by one
1578 /// assert_eq!(tx.capacity(), 4);
1579 /// # }
1580 /// ```
1581 ///
1582 /// [`channel`]: channel
1583 /// [`max_capacity`]: Sender::max_capacity
1584 /// [`capacity`]: Sender::capacity
1585 pub fn max_capacity(&self) -> usize {
1586 self.chan.semaphore().bound
1587 }
1588
1589 /// Returns the number of [`Sender`] handles.
1590 pub fn strong_count(&self) -> usize {
1591 self.chan.strong_count()
1592 }
1593
1594 /// Returns the number of [`WeakSender`] handles.
1595 pub fn weak_count(&self) -> usize {
1596 self.chan.weak_count()
1597 }
1598}
1599
1600impl<T> Clone for Sender<T> {
1601 fn clone(&self) -> Self {
1602 Sender {
1603 chan: self.chan.clone(),
1604 }
1605 }
1606}
1607
1608impl<T> fmt::Debug for Sender<T> {
1609 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1610 fmt.debug_struct("Sender")
1611 .field("chan", &self.chan)
1612 .finish()
1613 }
1614}
1615
1616impl<T> Clone for WeakSender<T> {
1617 fn clone(&self) -> Self {
1618 self.chan.increment_weak_count();
1619
1620 WeakSender {
1621 chan: self.chan.clone(),
1622 }
1623 }
1624}
1625
1626impl<T> Drop for WeakSender<T> {
1627 fn drop(&mut self) {
1628 self.chan.decrement_weak_count();
1629 }
1630}
1631
1632impl<T> WeakSender<T> {
1633 /// Tries to convert a `WeakSender` into a [`Sender`]. This will return `Some`
1634 /// if there are other `Sender` instances alive and the channel wasn't
1635 /// previously dropped, otherwise `None` is returned.
1636 pub fn upgrade(&self) -> Option<Sender<T>> {
1637 chan::Tx::upgrade(self.chan.clone()).map(Sender::new)
1638 }
1639
1640 /// Returns the number of [`Sender`] handles.
1641 pub fn strong_count(&self) -> usize {
1642 self.chan.strong_count()
1643 }
1644
1645 /// Returns the number of [`WeakSender`] handles.
1646 pub fn weak_count(&self) -> usize {
1647 self.chan.weak_count()
1648 }
1649}
1650
1651impl<T> fmt::Debug for WeakSender<T> {
1652 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1653 fmt.debug_struct("WeakSender").finish()
1654 }
1655}
1656
1657// ===== impl Permit =====
1658
1659impl<T> Permit<'_, T> {
1660 /// Sends a value using the reserved capacity.
1661 ///
1662 /// Capacity for the message has already been reserved. The message is sent
1663 /// to the receiver and the permit is consumed. The operation will succeed
1664 /// even if the receiver half has been closed. See [`Receiver::close`] for
1665 /// more details on performing a clean shutdown.
1666 ///
1667 /// [`Receiver::close`]: Receiver::close
1668 ///
1669 /// # Examples
1670 ///
1671 /// ```
1672 /// use tokio::sync::mpsc;
1673 ///
1674 /// # #[tokio::main(flavor = "current_thread")]
1675 /// # async fn main() {
1676 /// let (tx, mut rx) = mpsc::channel(1);
1677 ///
1678 /// // Reserve capacity
1679 /// let permit = tx.reserve().await.unwrap();
1680 ///
1681 /// // Trying to send directly on the `tx` will fail due to no
1682 /// // available capacity.
1683 /// assert!(tx.try_send(123).is_err());
1684 ///
1685 /// // Send a message on the permit
1686 /// permit.send(456);
1687 ///
1688 /// // The value sent on the permit is received
1689 /// assert_eq!(rx.recv().await.unwrap(), 456);
1690 /// # }
1691 /// ```
1692 pub fn send(self, value: T) {
1693 use std::mem;
1694
1695 self.chan.send(value);
1696
1697 // Avoid the drop logic
1698 mem::forget(self);
1699 }
1700}
1701
1702impl<T> Drop for Permit<'_, T> {
1703 fn drop(&mut self) {
1704 use chan::Semaphore;
1705
1706 let semaphore = self.chan.semaphore();
1707
1708 // Add the permit back to the semaphore
1709 semaphore.add_permit();
1710
1711 // If this is the last sender for this channel, wake the receiver so
1712 // that it can be notified that the channel is closed.
1713 if semaphore.is_closed() && semaphore.is_idle() {
1714 self.chan.wake_rx();
1715 }
1716 }
1717}
1718
1719impl<T> fmt::Debug for Permit<'_, T> {
1720 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1721 fmt.debug_struct("Permit")
1722 .field("chan", &self.chan)
1723 .finish()
1724 }
1725}
1726
1727// ===== impl PermitIterator =====
1728
1729impl<'a, T> Iterator for PermitIterator<'a, T> {
1730 type Item = Permit<'a, T>;
1731
1732 fn next(&mut self) -> Option<Self::Item> {
1733 if self.n == 0 {
1734 return None;
1735 }
1736
1737 self.n -= 1;
1738 Some(Permit { chan: self.chan })
1739 }
1740
1741 fn size_hint(&self) -> (usize, Option<usize>) {
1742 let n = self.n;
1743 (n, Some(n))
1744 }
1745}
1746impl<T> ExactSizeIterator for PermitIterator<'_, T> {}
1747impl<T> std::iter::FusedIterator for PermitIterator<'_, T> {}
1748
1749impl<T> Drop for PermitIterator<'_, T> {
1750 fn drop(&mut self) {
1751 use chan::Semaphore;
1752
1753 if self.n == 0 {
1754 return;
1755 }
1756
1757 let semaphore = self.chan.semaphore();
1758
1759 // Add the remaining permits back to the semaphore
1760 semaphore.add_permits(self.n);
1761
1762 // If this is the last sender for this channel, wake the receiver so
1763 // that it can be notified that the channel is closed.
1764 if semaphore.is_closed() && semaphore.is_idle() {
1765 self.chan.wake_rx();
1766 }
1767 }
1768}
1769
1770impl<T> fmt::Debug for PermitIterator<'_, T> {
1771 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1772 fmt.debug_struct("PermitIterator")
1773 .field("chan", &self.chan)
1774 .field("capacity", &self.n)
1775 .finish()
1776 }
1777}
1778
1779// ===== impl Permit =====
1780
1781impl<T> OwnedPermit<T> {
1782 /// Sends a value using the reserved capacity.
1783 ///
1784 /// Capacity for the message has already been reserved. The message is sent
1785 /// to the receiver and the permit is consumed. The operation will succeed
1786 /// even if the receiver half has been closed. See [`Receiver::close`] for
1787 /// more details on performing a clean shutdown.
1788 ///
1789 /// Unlike [`Permit::send`], this method returns the [`Sender`] from which
1790 /// the `OwnedPermit` was reserved.
1791 ///
1792 /// [`Receiver::close`]: Receiver::close
1793 ///
1794 /// # Examples
1795 ///
1796 /// ```
1797 /// use tokio::sync::mpsc;
1798 ///
1799 /// # #[tokio::main(flavor = "current_thread")]
1800 /// # async fn main() {
1801 /// let (tx, mut rx) = mpsc::channel(1);
1802 ///
1803 /// // Reserve capacity
1804 /// let permit = tx.reserve_owned().await.unwrap();
1805 ///
1806 /// // Send a message on the permit, returning the sender.
1807 /// let tx = permit.send(456);
1808 ///
1809 /// // The value sent on the permit is received
1810 /// assert_eq!(rx.recv().await.unwrap(), 456);
1811 ///
1812 /// // We may now reuse `tx` to send another message.
1813 /// tx.send(789).await.unwrap();
1814 /// # }
1815 /// ```
1816 pub fn send(mut self, value: T) -> Sender<T> {
1817 let chan = self.chan.take().unwrap_or_else(|| {
1818 unreachable!("OwnedPermit channel is only taken when the permit is moved")
1819 });
1820 chan.send(value);
1821
1822 Sender { chan }
1823 }
1824
1825 /// Releases the reserved capacity *without* sending a message, returning the
1826 /// [`Sender`].
1827 ///
1828 /// # Examples
1829 ///
1830 /// ```
1831 /// use tokio::sync::mpsc;
1832 ///
1833 /// # #[tokio::main(flavor = "current_thread")]
1834 /// # async fn main() {
1835 /// let (tx, rx) = mpsc::channel(1);
1836 ///
1837 /// // Clone the sender and reserve capacity
1838 /// let permit = tx.clone().reserve_owned().await.unwrap();
1839 ///
1840 /// // Trying to send on the original `tx` will fail, since the `permit`
1841 /// // has reserved all the available capacity.
1842 /// assert!(tx.try_send(123).is_err());
1843 ///
1844 /// // Release the permit without sending a message, returning the clone
1845 /// // of the sender.
1846 /// let tx2 = permit.release();
1847 ///
1848 /// // We may now reuse `tx` to send another message.
1849 /// tx.send(789).await.unwrap();
1850 /// # drop(rx); drop(tx2);
1851 /// # }
1852 /// ```
1853 ///
1854 /// [`Sender`]: Sender
1855 pub fn release(mut self) -> Sender<T> {
1856 use chan::Semaphore;
1857
1858 let chan = self.chan.take().unwrap_or_else(|| {
1859 unreachable!("OwnedPermit channel is only taken when the permit is moved")
1860 });
1861
1862 // Add the permit back to the semaphore
1863 chan.semaphore().add_permit();
1864 Sender { chan }
1865 }
1866
1867 /// Returns `true` if permits belong to the same channel.
1868 ///
1869 /// # Examples
1870 ///
1871 /// ```
1872 /// use tokio::sync::mpsc;
1873 ///
1874 /// # #[tokio::main(flavor = "current_thread")]
1875 /// # async fn main() {
1876 /// let (tx, rx) = mpsc::channel::<()>(2);
1877 ///
1878 /// let permit1 = tx.clone().reserve_owned().await.unwrap();
1879 /// let permit2 = tx.clone().reserve_owned().await.unwrap();
1880 /// assert!(permit1.same_channel(&permit2));
1881 ///
1882 /// let (tx2, rx2) = mpsc::channel::<()>(1);
1883 ///
1884 /// let permit3 = tx2.clone().reserve_owned().await.unwrap();
1885 /// assert!(!permit3.same_channel(&permit2));
1886 /// # }
1887 /// ```
1888 pub fn same_channel(&self, other: &Self) -> bool {
1889 self.chan
1890 .as_ref()
1891 .zip(other.chan.as_ref())
1892 .is_some_and(|(a, b)| a.same_channel(b))
1893 }
1894
1895 /// Returns `true` if this permit belongs to the same channel as the given [`Sender`].
1896 ///
1897 /// # Examples
1898 ///
1899 /// ```
1900 /// use tokio::sync::mpsc;
1901 ///
1902 /// # #[tokio::main(flavor = "current_thread")]
1903 /// # async fn main() {
1904 /// let (tx, rx) = mpsc::channel::<()>(1);
1905 ///
1906 /// let permit = tx.clone().reserve_owned().await.unwrap();
1907 /// assert!(permit.same_channel_as_sender(&tx));
1908 ///
1909 /// let (tx2, rx2) = mpsc::channel::<()>(1);
1910 /// assert!(!permit.same_channel_as_sender(&tx2));
1911 /// # }
1912 /// ```
1913 pub fn same_channel_as_sender(&self, sender: &Sender<T>) -> bool {
1914 self.chan
1915 .as_ref()
1916 .is_some_and(|chan| chan.same_channel(&sender.chan))
1917 }
1918}
1919
1920impl<T> Drop for OwnedPermit<T> {
1921 fn drop(&mut self) {
1922 use chan::Semaphore;
1923
1924 // Are we still holding onto the sender?
1925 if let Some(chan) = self.chan.take() {
1926 let semaphore = chan.semaphore();
1927
1928 // Add the permit back to the semaphore
1929 semaphore.add_permit();
1930
1931 // If this `OwnedPermit` is holding the last sender for this
1932 // channel, wake the receiver so that it can be notified that the
1933 // channel is closed.
1934 if semaphore.is_closed() && semaphore.is_idle() {
1935 chan.wake_rx();
1936 }
1937 }
1938
1939 // Otherwise, do nothing.
1940 }
1941}
1942
1943impl<T> fmt::Debug for OwnedPermit<T> {
1944 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
1945 fmt.debug_struct("OwnedPermit")
1946 .field("chan", &self.chan)
1947 .finish()
1948 }
1949}